#!/usr/bin/env python3
#
# OpenSlide, a library for reading whole slide image files
#
# Copyright (c) 2012-2015 Carnegie Mellon University
# Copyright (c) 2015-2023 Benjamin Gilbert
# All rights reserved.
#
# OpenSlide is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, version 2.1.
#
# OpenSlide is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with OpenSlide. If not, see
# <http://www.gnu.org/licenses/>.
#

from __future__ import annotations

from collections import defaultdict
from collections.abc import Iterable, Iterator, Sequence
from concurrent.futures import ThreadPoolExecutor
from configparser import RawConfigParser
from contextlib import closing, contextmanager
import errno
import filecmp
import fnmatch
from functools import cached_property, lru_cache
from hashlib import sha256
import inspect
from lzma import LZMACompressor
import os
from pathlib import Path, PurePath
import platform
import re
import shlex
from shutil import copytree, rmtree
import socket
import subprocess
import sys
import tarfile
from tempfile import (
    NamedTemporaryFile,
    TemporaryDirectory,
    TemporaryFile,
    mkdtemp,
)
import textwrap
from threading import Thread
from time import time as curtime
from typing import (
    Any,
    BinaryIO,
    NewType,
    Protocol,
    cast,
)
from urllib.parse import urljoin
from zipfile import ZipFile

import requests
import yaml

TESTDATA_URL = os.getenv(
    'OPENSLIDE_TESTDATA_URL',
    'https://openslide.cs.cmu.edu/download/openslide-testdata/',
)
DEFAULT_FROZEN_BUCKET = 'openslide-frozen-testdata'
SRCDIR = Path(r'@SRCDIR@')
BUILDDIR = Path(r'@BUILDDIR@')
CLANG_LSAN_SUPPRESSIONS = SRCDIR / 'clang-lsan.supp'
VALGRIND_SUPPRESSIONS = SRCDIR / 'valgrind.supp'
VALGRIND_SUPPRESSIONS_GLIB = Path(
    r'@GLIB2_DATADIR@/glib-2.0/valgrind/glib.supp'
)
CASEROOT = SRCDIR / 'cases'
SLIDELIST = CASEROOT / 'slides.yaml'
FROZENLIST = CASEROOT / 'frozen.yaml'
MOSAICLIST = CASEROOT / 'mosaic.ini'
CACHE = Path(os.getenv('OPENSLIDE_TEST_CACHE', r'@BUILDDIR@/_slidedata'))
CACHE_TAG = CACHE / 'CACHEDIR.TAG'
WORKROOT = CACHE / 'unpacked'
PRISTINE = CACHE / 'pristine'
FUSEMOUNT = CACHE / 'fuse'
FROZENBASE = CACHE
FROZEN = CACHE / 'frozen'
FEATURES = set('@FEATURES@'.split())
TESTCONF = 'config.yaml'

GREEN = '\033[1;32m'
BLUE = '\033[1;34m'
RED = '\033[1;31m'
RESET = '\033[1;0m'

_commands = []
_command_funcs = {}

http = requests.Session()
http.headers.update(
    {
        'User-Agent': 'OpenSlide/@VERSION@ (driver) '
        + requests.utils.default_headers()['User-Agent']
    }
)


class Skip:
    pass


class ConnectionInterrupted(Exception):
    pass


class Command(Protocol):
    def __call__(self, *args: str) -> None:
        pass


# There doesn't seem to be a way to tell mypy that a function taking k
# strings satisfies a variadic function taking strings, so we type f as Any.
def _command(f: Any) -> Command:
    '''Decorator to mark the function as a user command.'''
    ff = cast(Command, f)
    _commands.append(f.__name__)
    _command_funcs[f.__name__] = ff
    return ff


def _color(color: str, str: str) -> str:
    '''Return str, wrapped in the specified ANSI color escape sequence.'''
    return color + str + RESET


def _cpu_count() -> int:
    # Does not account for cgroup CPU limits, which might be lower
    # https://github.com/python/cpython/issues/80235
    try:
        # Linux
        return len(os.sched_getaffinity(0))
    except AttributeError:
        # macOS
        return os.cpu_count() or 1


class BaseSlide:
    '''A pristine slide from openslide-testdata, not necessarily present
    in the filesystem.'''

    def __init__(self, relpath: PurePath):
        digests = self._get_digests()
        if relpath not in digests:
            raise ValueError(f'{relpath} not in {SLIDELIST}')

        self.relpath = relpath
        # path of containing directory inside PRISTINE
        self.path = PRISTINE / relpath
        self.digest = digests[relpath]
        self.is_zip = relpath.suffix == '.zip'

    @staticmethod
    @lru_cache
    def _get_digests() -> dict[PurePath, str]:
        '''Return map of relative paths of all base slides and their
        digests.'''
        with SLIDELIST.open() as fh:
            return {PurePath(k): v for k, v in yaml.safe_load(fh).items()}

    def __str__(self) -> str:
        return self.relpath.as_posix()

    def fetch(self) -> None:
        '''Download and unpack the slide if we don't already have it.'''

        if self.path.exists():
            return

        url = urljoin(TESTDATA_URL, str(self))

        self.path.mkdir(parents=True)
        try:
            if self.is_zip:
                dest: BinaryIO = TemporaryFile(dir=self.path)
            else:
                dest = open(self.path / self.path.name, 'wb')

            with dest:
                for retries_remaining in range(4, -1, -1):
                    try:
                        digest = _download(url, str(self), dest)
                    except ConnectionInterrupted:
                        if retries_remaining == 0:
                            raise
                        else:
                            print('Retrying...')
                            dest.seek(0)
                            dest.truncate()
                    else:
                        break

                if digest != self.digest:
                    raise ValueError(f'Hash mismatch: {self}')

                if self.is_zip:
                    print(f'Unpacking {self}...')
                    with closing(ZipFile(dest)) as zf:
                        zf.extractall(path=self.path)
        except BaseException:
            rmtree(self.path, ignore_errors=True)
            raise

    @cached_property
    def files(self) -> list[PurePath]:
        '''Relative paths of files within the slide.'''

        def walk(basedir: Path) -> list[PurePath]:
            files: list[PurePath] = []
            for path in basedir.iterdir():
                name = PurePath(path.name)
                if path.is_dir():
                    files.extend(name / p for p in walk(path))
                else:
                    files.append(name)
            return files

        self.fetch()
        return walk(self.path)


class UnpackedSlide:
    '''A slide unpacked into a directory so that programs can be run on it.'''

    def __init__(self, path: Path | None):
        # path of the actual slide file to open
        self.path = path
        self.path_str = path.as_posix() if path else ''

    def run_prog(
        self,
        prog: str,
        valgrind: bool = False,
        extra_checks: bool = True,
        progdir: Path | None = None,
        debug: Iterable[str] | None = None,
        args: Iterable[str] | None = None,
        **kwargs: Any,
    ) -> subprocess.Popen[str]:
        '''Start the specified test program from the progdir directory
        against the slide, running under Valgrind if requested.  If
        extra_checks is False, turn off debug instrumentation that would
        invalidate benchmark results.  debug options are passed in
        OPENSLIDE_DEBUG.  args are appended to the command line.  kwargs are
        passed to the Popen constructor.  Return the Popen instance.'''

        if progdir is None:
            progdir = BUILDDIR
        env = os.environ.copy()
        env.update(
            G_MESSAGES_DEBUG='',
            OPENSLIDE_DEBUG=','.join(debug or []),
            GIO_USE_VFS='local',
            LSAN_OPTIONS=f'suppressions={CLANG_LSAN_SUPPRESSIONS},print_suppressions=0',  # noqa: E501
            UBSAN_OPTIONS='print_stacktrace=1',
        )
        if extra_checks:
            env.update(
                MALLOC_CHECK_='1',
            )
        args_: list[str | Path] = []
        if valgrind:
            args_.extend(
                [
                    'valgrind',
                    '--quiet',
                    '--error-exitcode=3',
                    '--suppressions=' + VALGRIND_SUPPRESSIONS.as_posix(),
                    '--suppressions=' + VALGRIND_SUPPRESSIONS_GLIB.as_posix(),
                    '--leak-check=full',
                    '--num-callers=30',
                ]
            )
            env.update(
                # Valgrind has known issues detecting memory initialization by
                # libjpeg-turbo SIMD code.  Disable SIMD support.
                # https://github.com/libjpeg-turbo/libjpeg-turbo/issues/277
                JSIMD_FORCENONE='1',
            )
        args_.extend([progdir / prog, self.path_str])
        if args:
            args_.extend(args)
        return subprocess.Popen(args_, env=env, text=True, **kwargs)

    def try_open(
        self,
        valgrind: bool = False,
        progdir: Path | None = None,
        debug: Iterable[str] | None = None,
        vendor: str | None | type[Skip] = Skip,
        properties: dict[str, str] | None = None,
        regions: Iterable[Iterable[int]] | None = None,
    ) -> str | None:
        '''Try opening the slide file, under Valgrind if specified, using
        the test program in the progdir directory.  Return None on success,
        error message on failure.  vendor is the vendor string that should
        be returned by openslide_detect_vendor(), None for NULL, or Skip to
        omit the test.  properties is a map of slide properties and their
        expected values.  regions is a list of region tuples (x, y, level,
        w, h).  debug is a list of OPENSLIDE_DEBUG options.'''

        args = []
        if vendor is not Skip:
            vendor_str = cast(str, 'none' if vendor is None else vendor)
            args.extend(['-n', vendor_str])
        if properties:
            for k, v in properties.items():
                args.extend(['-p', '='.join([k, (v or '')])])
        if regions:
            for region in regions:
                args.extend(['-r', ' '.join(str(d) for d in region)])
        proc = self.run_prog(
            'try_open',
            valgrind=valgrind,
            args=args,
            progdir=progdir,
            debug=debug or [],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        out, err = proc.communicate()
        if out or err or proc.returncode > 0:
            return (out + err).strip()
        elif proc.returncode:
            return f'Exited with status {proc.returncode}'
        else:
            return None

    def try_extended(
        self,
        valgrind: bool = False,
        progdir: Path | None = None,
        debug: Iterable[str] | None = None,
    ) -> str | None:
        '''Run the extended test program against the slide file, under
        Valgrind if specified, using the test program in the progdir
        directory.  Return None on success, error message on failure.'''

        proc = self.run_prog(
            'extended',
            valgrind=valgrind,
            progdir=progdir,
            debug=debug or [],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        out, err = proc.communicate()
        if out or err:
            return (out + err).strip()
        elif proc.returncode:
            return f'Exited with status {proc.returncode}'
        else:
            return None


class TestCaseConfig:
    '''The configuration file for a test case.'''

    def __init__(self, testname: str):
        self.testname = testname

        # https://openslide.org/docs/testsuite/#annotated-configyaml
        try:
            with open(CASEROOT / testname / TESTCONF) as fh:
                conf = yaml.safe_load(fh)
        except FileNotFoundError:
            raise ValueError('Test does not exist')
        self.base_slide = BaseSlide(PurePath(conf['base']))
        self.filename: str = conf['slide']
        self.success: bool = conf['success']
        if not self.success:
            self.error = re.compile(conf['error'])
        self.vendor: str | None = conf.get('vendor')
        self.primary: bool = conf.get('primary', False)
        self.freezable: bool = conf.get('freezable', True)
        self.required_features: set[str] = set(conf.get('requires', []))
        self.debug: set[str] = set(conf.get('debug', []))
        self.regions: list[list[int]] = conf.get('regions', [])
        self.properties: dict[str, str] = conf.get('properties', {})
        self.generators = self._generator_map(conf, 'generate')
        self.copies = self._file_map(conf, 'copy')
        self.renames = self._file_map(conf, 'rename')

    def __str__(self) -> str:
        return self.testname

    @staticmethod
    def _generator_map(
        conf: dict[str, Any], field: str
    ) -> dict[PurePath, str]:
        return {PurePath(k): v for k, v in conf.get(field, {}).items()}

    @staticmethod
    def _file_map(
        conf: dict[str, Any], field: str
    ) -> dict[PurePath, PurePath]:
        return {
            PurePath(k): PurePath(v) for k, v in conf.get(field, {}).items()
        }

    @property
    def features_available(self) -> bool:
        '''True if the features required by the test are available in this
        build.'''
        return self.required_features.issubset(FEATURES)

    @staticmethod
    def create(base_slide: BaseSlide, testpath: Path) -> None:
        for relpath in base_slide.files:
            unpacked = UnpackedSlide(base_slide.path / relpath)
            if unpacked.try_open() is None:
                slidefile = relpath
                break
        else:
            raise OSError('Could not locate readable slide file')

        query = unpacked.run_prog('query', args=['-n'], stdout=subprocess.PIPE)
        vendor_str, _ = query.communicate()
        if query.returncode:
            raise OSError('Could not query slide vendor')
        vendor = vendor_str.strip() or None

        conf = {
            'success': False,
            'error': '^$',
            'base': base_slide.relpath.as_posix(),
            'slide': slidefile.as_posix(),
            'vendor': vendor,
        }
        with open(testpath / TESTCONF, 'w') as fh:
            yaml.safe_dump(conf, fh, default_flow_style=False)

    @staticmethod
    def exists(testname: str) -> bool:
        return (CASEROOT / testname / TESTCONF).exists()


class TestCase:
    _can_reflink = True

    def __init__(self, name: str):
        self.name = name
        self.conf = TestCaseConfig(name)

    def __str__(self) -> str:
        return self.name

    @classmethod
    @lru_cache
    def list(cls, pattern: str = '*') -> list[TestCase]:
        '''Return a list of tests matching the specified pattern.'''
        return [
            cls(path.name)
            for path in sorted(CASEROOT.iterdir())
            if fnmatch.fnmatch(path.name, pattern)
            and TestCaseConfig.exists(path.name)
        ]

    @staticmethod
    def create(base_slide: BaseSlide, name: str) -> None:
        '''Create a new test case with the specified base slide and name.'''

        testpath = CASEROOT / name
        if testpath.exists():
            raise ValueError('A test with that name already exists')

        base_slide.fetch()

        print(f'Creating test {name} for {base_slide}')
        testpath.mkdir()
        copytree(base_slide.path, testpath / 'slide')
        TestCaseConfig.create(base_slide, testpath)

    @staticmethod
    def pack(name: str) -> None:
        '''Pack a newly-created test case for checkin.'''

        conf = TestCaseConfig(name)
        print(f'Packing {name}...')

        total_size = 0
        for relpath in conf.base_slide.files + list(conf.copies):
            origpath = conf.base_slide.path / conf.copies.get(relpath, relpath)
            newpath = CASEROOT / name / 'slide' / relpath
            deltapath = CASEROOT / name / f'{relpath.name}.xdelta'
            whiteoutpath = CASEROOT / name / f'{relpath.name}.whiteout'

            for path in deltapath, whiteoutpath:
                if path.exists():
                    raise OSError(f'{path} already exists')

            if newpath.exists():
                if not filecmp.cmp(origpath, newpath, shallow=False):
                    subprocess.check_call(
                        [
                            'xdelta3',
                            'encode',
                            '-9',
                            '-W',
                            '16777216',
                            '-S',
                            'none',
                            '-s',
                            origpath,
                            newpath,
                            deltapath,
                        ]
                    )
                    total_size += deltapath.stat().st_size
            elif relpath not in conf.copies:
                whiteoutpath.write_bytes(b'')

        rmtree(CASEROOT / name / 'slide')

        total_size_kb = total_size >> 10
        if total_size_kb:
            print(f'Delta: {total_size_kb} KB')
        else:
            print(f'Delta: {total_size} bytes')

    def unpack(self) -> None:
        '''Unpack the test case.'''

        conf = self.conf
        conf.base_slide.fetch()
        printed = False

        for relpath in conf.base_slide.files + list(conf.copies):
            origpath = conf.base_slide.path / conf.copies.get(relpath, relpath)
            newpath = WORKROOT / self.name / conf.renames.get(relpath, relpath)
            deltapath = CASEROOT / self.name / f'{relpath.name}.xdelta'
            whiteoutpath = CASEROOT / self.name / f'{relpath.name}.whiteout'

            if not newpath.exists() and not whiteoutpath.exists():
                if not printed:
                    print(f'Unpacking {self}...')
                    printed = True

                newpath.parent.mkdir(parents=True, exist_ok=True)

                generator = conf.generators.get(relpath)
                if generator:
                    self._run_generator(generator, origpath, newpath)
                elif deltapath.exists():
                    self._decode_xdelta(origpath, deltapath, newpath)
                else:
                    src = os.path.relpath(origpath, newpath.parent)
                    newpath.symlink_to(src)

    @staticmethod
    def _run_generator(command_str: str, inpath: Path, outpath: Path) -> None:
        '''Run the specified generator pipeline.'''

        cmds = command_str.split('|')
        procs = []
        fin: list[int | None] = []
        fout: list[int | None] = []
        try:
            fin.append(None)
            for _ in range(len(cmds) - 1):
                pipe_r, pipe_w = os.pipe()
                fout.append(pipe_w)
                fin.append(pipe_r)
            fout.append(None)

            for i, cmd in enumerate(cmds):
                proc = subprocess.Popen(
                    [
                        a % {'in': inpath, 'out': outpath}
                        for a in shlex.split(cmd)
                    ],
                    stdin=fin[i],
                    stdout=fout[i],
                    close_fds=True,
                )
                procs.append(proc)
        finally:
            for fh in fout + fin:
                if fh is not None:
                    os.close(fh)

        returncode = 0
        for proc in procs:
            proc.wait()
            returncode = returncode or proc.returncode
        if returncode:
            raise OSError(f'Generator returned exit status {returncode}')

    @classmethod
    def _decode_xdelta(
        cls, inpath: Path, deltapath: Path, outpath: Path
    ) -> None:
        '''Apply an xdelta based on inpath and write it to outpath.'''
        # If possible, reflink inpath to outpath and write only the changed
        # blocks, saving a lot of disk space.
        if cls._try_reflink(inpath, outpath):
            bufsize = 1 << 20
            # We could take this down to stat(outpath).st_blksize
            # granularity (4K) but we don't need perfect savings.  Mostly
            # we're trying to avoid copying huge runs of unchanged data.
            blksize = 1 << 20
            proc = subprocess.Popen(
                ['xdelta3', 'decode', '-cs', inpath, deltapath],
                stdout=subprocess.PIPE,
                bufsize=bufsize,
            )
            assert proc.stdout is not None
            with outpath.open('rb+', buffering=bufsize) as fh:
                while True:
                    want = proc.stdout.read(blksize)
                    if len(want) == 0:
                        break
                    have = fh.read(blksize)
                    if want != have:
                        fh.seek(-len(have), 1)
                        fh.write(want)
                fh.truncate()
            if proc.wait() != 0:
                raise OSError(f'xdelta failed with status {proc.returncode}')
        else:
            subprocess.check_call(
                ['xdelta3', 'decode', '-s', inpath, deltapath, outpath]
            )

    @classmethod
    def _try_reflink(cls, inpath: Path, outpath: Path) -> bool:
        '''Try to make a zero-copy clone of the file inpath at outpath.
        Return True if successful.'''
        # Python can't do this natively yet, and we don't want to require an
        # external module
        # https://github.com/python/cpython/issues/81338
        if not cls._can_reflink:
            return False
        system = platform.system()
        if system == 'Linux':
            ret = subprocess.call(
                ['cp', '--reflink=always', '--', inpath, outpath],
                stderr=subprocess.DEVNULL,
            )
            if ret == 0:
                return True
            # cp from coreutils <= 9.1 leaves an empty file at outpath
            # https://debbugs.gnu.org/cgi/bugreport.cgi?bug=56391
            outpath.unlink(missing_ok=True)
        elif system == 'Darwin':
            ret = subprocess.call(
                ['cp', '-c', inpath, outpath], stderr=subprocess.DEVNULL
            )
            if ret == 0:
                return True
        # If we fail once, don't try again.  It's possible that we might
        # succeed under other circumstances (e.g. inpath and outpath might
        # be on different filesystems) but let's not repeatedly spawn
        # failing subprocesses.
        cls._can_reflink = False
        return False

    def run(
        self,
        valgrind: bool = False,
        xfail: bool = False,
        progdir: Path | None = None,
        workdir: Path = WORKROOT,
    ) -> tuple[bool, str]:
        '''Run the test, under Valgrind if specified.  Also execute extended
        tests against cases which 1) are marked primary, 2) are expected to
        succeed, and 3) do in fact succeed.  If xfail is specified, invert
        the sense of the result.'''

        conf = self.conf
        if not conf.features_available:
            return True, _color(BLUE, f'{self}: skipped')
        if conf.filename == '':
            # synthetic test slide
            unpacked = UnpackedSlide(None)
        else:
            unpacked = UnpackedSlide(workdir / self.name / conf.filename)
        result = unpacked.try_open(
            valgrind,
            progdir,
            vendor=conf.vendor,
            properties=conf.properties,
            regions=conf.regions,
            debug=conf.debug,
        )

        msg = _color(GREEN, f'{self}: OK')
        ok = True
        if result is None and not conf.success:
            msg = _color(RED, f'{self}: unexpected success')
            ok = False
        elif result is not None and conf.success:
            msg = _color(RED, f'{self}: unexpected failure: {result}')
            ok = False
        elif result is not None and not conf.error.search(result):
            msg = _color(RED, f'{self}: incorrect error: {result}')
            ok = False
        elif conf.primary and conf.success:
            result = unpacked.try_extended(valgrind, progdir, debug=conf.debug)
            if result:
                msg = _color(RED, f'{self}: extended test failed: {result}')
                ok = False

        if xfail:
            ok = not ok
            if ok:
                msg = _color(BLUE, f'{self}: failed as expected')
            else:
                msg = _color(RED, f'{self}: expected to fail, but passed')

        return ok, msg


class S3Uploader:
    def __init__(self, bucket: str):
        import boto3

        self._s3 = boto3.client('s3')
        self._bucket = bucket
        region = (
            self._s3.get_bucket_location(Bucket=bucket)['LocationConstraint']
            or 'us-east-1'
        )
        self._baseurl = (
            f'https://{bucket}.s3.dualstack.{region}.amazonaws.com/'
        )

    def url(self, key: PurePath) -> str:
        '''Return public URL for key.'''
        return self._baseurl + key.as_posix()

    def exists(self, key: PurePath) -> bool:
        try:
            self._s3.head_object(Bucket=self._bucket, Key=key.as_posix())
            return True
        except self._s3.exceptions.ClientError as e:
            # https://github.com/boto/boto3/issues/2442
            if e.response['Error']['Code'] != '404':
                raise
            return False

    def upload(
        self,
        key: PurePath,
        fh: BinaryIO,
        content_type: str = 'application/octet-stream',
    ) -> None:
        '''Upload content to the specified S3 bucket and key.'''
        # Set up progress reporting
        length = fh.seek(0, os.SEEK_END)
        last_update = 0.0
        uploaded = 0

        def progress(count: int) -> None:
            nonlocal last_update, uploaded
            uploaded += count
            now = curtime()
            if now - last_update >= 1:
                print(
                    f'Uploading {uploaded >> 20}/{length >> 20} MB...\r',
                    end='',
                )
                sys.stdout.flush()
                last_update = now

        # Upload object
        fh.seek(0)
        self._s3.upload_fileobj(
            fh,
            self._bucket,
            key.as_posix(),
            Callback=progress,
            ExtraArgs={
                'ContentType': content_type,
            },
        )
        print('{:<79}'.format(f'Uploaded {length >> 20} MB'))


def _download(url: str, name: str, fh: BinaryIO) -> str:
    '''Download the specified URL, write to the specified file handle, and
    return the SHA-256 of the data.  Raise ConnectionInterrupted on timeout
    or short read.'''

    print(f'Fetching {name}...\r', end='')
    sys.stdout.flush()
    r = http.get(url, stream=True, timeout=120)
    r.raise_for_status()

    cur = 0
    last_update = 0.0
    size = int(r.headers['Content-Length'])
    hash = sha256()

    try:
        for chunk in r.iter_content(128 << 10):
            fh.write(chunk)
            hash.update(chunk)

            cur += len(chunk)
            now = curtime()
            if now - last_update >= 1:
                print(
                    f'Fetching {name} ({cur >> 20}/{size >> 20} MB)...\r',
                    end='',
                )
                sys.stdout.flush()
                last_update = now
        if cur != size:
            raise ConnectionInterrupted
    except (
        ConnectionInterrupted,
        requests.exceptions.Timeout,
        socket.timeout,
    ):
        print(
            '{:<79}'.format(
                f'Failure fetching {name} ({cur >> 20}/{size >> 20} MB)'
            )
        )
        raise ConnectionInterrupted
    else:
        print('{:<79}'.format(f'Fetched {name} ({size >> 20} MB)'))
        return hash.hexdigest()


@_command
def create(slide: str, testname: str) -> None:
    '''Create a new test case with the specified name and base slide (e.g.
    "Mirax/CMU-1.zip").'''

    TestCase.create(BaseSlide(PurePath(slide)), testname)


@_command
def pack(testname: str) -> None:
    '''Pack a newly-created test case for checkin.'''

    TestCase.pack(testname)


@_command
def unpack(pattern: str = '*') -> None:
    '''Unpack all tests matching the specified pattern.  If pattern is
    `nonfrozen`, unpack tests for which we don't have a frozen counterpart.'''
    conditional = False
    if pattern == 'nonfrozen':
        pattern = '*'
        conditional = True
    for test in TestCase.list(pattern):
        if not conditional or not (FROZEN / test.name).exists():
            test.unpack()


# Read-only FUSE filesystem that proxies a backing directory tree and builds
# a shadow directory tree of sparse files containing only the accessed
# bytes.
#
# _fusefs_init() and _fusefs_run() are based on pyfuse3's
# examples/passthroughfs.py, copyright © Nikolaus Rath <Nikolaus.org>
def _fusefs_init(shadowdir: Path) -> None:
    '''Prepare a FUSE filesystem to run, and mount it.'''
    # trio, imported by pyfuse3, tries to override sys.excepthook and
    # complains if the distro already put something there.  We don't need
    # the distro's error reporting, so reset the excepthook to default.
    sys.excepthook = sys.__excepthook__

    import pyfuse3
    from pyfuse3 import FileHandleT, FileNameT, FUSEError, InodeT

    # A Unix file descriptor
    FileDescriptorT = NewType('FileDescriptorT', FileHandleT)

    class Operations(pyfuse3.Operations):
        def __init__(self) -> None:
            super().__init__()
            # inode -> (relative path within FUSE FS, backing path on disk)
            self._inode_path_map = {
                pyfuse3.ROOT_INODE: (PurePath('.'), WORKROOT)
            }
            self._lookup_cnt: dict[InodeT, int] = defaultdict(int)
            self._fd_inode_map: dict[FileDescriptorT, InodeT] = dict()
            self._inode_fd_map: dict[InodeT, FileDescriptorT] = dict()
            self._fd_shadow_map: dict[FileDescriptorT, BinaryIO] = dict()
            self._fd_open_count: dict[FileDescriptorT, int] = dict()

        def _inode_to_paths(self, inode: InodeT) -> tuple[PurePath, Path]:
            try:
                return self._inode_path_map[inode]
            except KeyError:
                raise FUSEError(errno.ENOENT)

        def _add_paths(
            self, inode: InodeT, relpath: PurePath, backingpath: Path
        ) -> None:
            self._lookup_cnt[inode] += 1
            self._inode_path_map.setdefault(inode, (relpath, backingpath))

        def _shadowpath(self, relpath: PurePath) -> Path:
            shadowpath = shadowdir / relpath
            shadowpath.parent.mkdir(parents=True, exist_ok=True)
            return shadowpath

        async def forget(
            self, inode_list: Sequence[tuple[InodeT, int]]
        ) -> None:
            for inode, nlookup in inode_list:
                if self._lookup_cnt[inode] > nlookup:
                    self._lookup_cnt[inode] -= nlookup
                    continue
                assert inode not in self._inode_fd_map
                del self._lookup_cnt[inode]
                try:
                    del self._inode_path_map[inode]
                except KeyError:  # may have been deleted
                    pass

        async def lookup(
            self,
            inode_p: InodeT,
            filename: FileNameT,
            ctx: pyfuse3.RequestContext,
        ) -> pyfuse3.EntryAttributes:
            name = os.fsdecode(filename)
            relpath, backingpath = self._inode_to_paths(inode_p)
            relpath /= name
            backingpath /= name
            # Special case: map PRISTINE into the filesystem even though it's
            # outside WORKROOT
            if inode_p == pyfuse3.ROOT_INODE and name == '_pristine':
                backingpath = PRISTINE
            attr = self._getattr(backingpath)
            if name != '.' and name != '..':
                self._add_paths(attr.st_ino, relpath, backingpath)
            return attr

        async def getattr(
            self, inode: InodeT, ctx: pyfuse3.RequestContext
        ) -> pyfuse3.EntryAttributes:
            _, backingpath = self._inode_to_paths(inode)
            return self._getattr(backingpath)

        def _getattr(self, backingpath: Path) -> pyfuse3.EntryAttributes:
            try:
                stat = backingpath.lstat()
            except OSError as exc:
                raise FUSEError(exc.errno or errno.EIO)
            entry = pyfuse3.EntryAttributes()
            for attr in (
                'st_ino',
                'st_mode',
                'st_nlink',
                'st_uid',
                'st_gid',
                'st_rdev',
                'st_size',
                'st_atime_ns',
                'st_mtime_ns',
                'st_ctime_ns',
            ):
                setattr(entry, attr, getattr(stat, attr))
            entry.generation = 0
            entry.entry_timeout = 0
            entry.attr_timeout = 0
            entry.st_blksize = 512
            entry.st_blocks = (
                entry.st_size + entry.st_blksize - 1
            ) // entry.st_blksize
            return entry

        async def readlink(
            self, inode: InodeT, ctx: pyfuse3.RequestContext
        ) -> FileNameT:
            relpath, backingpath = self._inode_to_paths(inode)
            try:
                target = Path(os.readlink(backingpath))
                # calculate target relative to the root of the FUSE FS
                reltarget = os.path.normpath(relpath.parent / target)
                # calculate path to PRISTINE relative to WORKROOT
                relpristine = os.path.relpath(PRISTINE, WORKROOT)
                if reltarget.startswith(relpristine):
                    # link points to pristine directory; repoint it inside
                    # the FUSE root
                    reltarget = reltarget.replace(relpristine, '_pristine')
                    target = Path(os.path.relpath(reltarget, relpath.parent))
                shadowpath = self._shadowpath(relpath)
                if not os.path.lexists(shadowpath):
                    shadowpath.symlink_to(target)
            except OSError as exc:
                raise FUSEError(exc.errno or errno.EIO)
            return cast(FileNameT, os.fsencode(target))

        async def opendir(
            self, inode: InodeT, ctx: pyfuse3.RequestContext
        ) -> FileHandleT:
            return cast(FileHandleT, inode)

        async def readdir(
            self, fh: FileHandleT, off: int, token: pyfuse3.ReaddirToken
        ) -> None:
            inode = cast(InodeT, fh)
            reldir, backingdir = self._inode_to_paths(inode)
            entries = []
            if inode == pyfuse3.ROOT_INODE:
                # Insert _pristine into the root dir
                attr = self._getattr(PRISTINE)
                entries.append(
                    (
                        attr.st_ino,
                        '_pristine',
                        reldir / '_pristine',
                        PRISTINE,
                        attr,
                    )
                )
            for path in backingdir.iterdir():
                relpath = reldir / path.name
                backingpath = backingdir / path.name
                attr = self._getattr(backingpath)
                entries.append(
                    (attr.st_ino, path.name, relpath, backingpath, attr)
                )
            for ino, name, relpath, backingpath, attr in sorted(entries):
                if ino <= off:
                    continue
                if not pyfuse3.readdir_reply(
                    token, cast(FileNameT, os.fsencode(name)), attr, ino
                ):
                    break
                self._add_paths(attr.st_ino, relpath, backingpath)

        async def statfs(
            self, ctx: pyfuse3.RequestContext
        ) -> pyfuse3.StatvfsData:
            stat_ = pyfuse3.StatvfsData()
            try:
                statfs = os.statvfs(WORKROOT)
            except OSError as exc:
                raise FUSEError(exc.errno or errno.EIO)
            for attr in (
                'f_bsize',
                'f_frsize',
                'f_blocks',
                'f_bfree',
                'f_bavail',
                'f_files',
                'f_ffree',
                'f_favail',
            ):
                setattr(stat_, attr, getattr(statfs, attr))
            stat_.f_namemax = statfs.f_namemax - (len(WORKROOT.as_posix()) + 1)
            return stat_

        async def open(
            self,
            inode: InodeT,
            flags: pyfuse3.FlagT,
            ctx: pyfuse3.RequestContext,
        ) -> pyfuse3.FileInfo:
            # set direct_io to avoid kernel readahead
            if inode in self._inode_fd_map:
                fd = self._inode_fd_map[inode]
                self._fd_open_count[fd] += 1
                return pyfuse3.FileInfo(fh=fd, direct_io=True)
            assert flags & os.O_CREAT == 0
            try:
                relpath, backingpath = self._inode_to_paths(inode)
                fd = cast(FileDescriptorT, os.open(backingpath, flags))
                shadow = os.fdopen(
                    os.open(
                        self._shadowpath(relpath),
                        os.O_WRONLY | os.O_CREAT,
                        0o644,
                    ),
                    'wb',
                )
                shadow.truncate(os.lseek(fd, 0, os.SEEK_END))
            except OSError as exc:
                raise FUSEError(exc.errno or errno.EIO)
            self._inode_fd_map[inode] = fd
            self._fd_inode_map[fd] = inode
            self._fd_shadow_map[fd] = shadow
            self._fd_open_count[fd] = 1
            return pyfuse3.FileInfo(fh=fd, direct_io=True)

        async def read(
            self, fh: FileHandleT, offset: int, length: int
        ) -> bytes:
            fd = cast(FileDescriptorT, fh)
            os.lseek(fd, offset, os.SEEK_SET)
            buf = os.read(fd, length)
            shadow = self._fd_shadow_map[fd]
            shadow.seek(offset)
            shadow.write(buf)
            return buf

        async def release(self, fh: FileHandleT) -> None:
            fd = cast(FileDescriptorT, fh)
            if self._fd_open_count[fd] > 1:
                self._fd_open_count[fd] -= 1
                return

            del self._fd_open_count[fd]
            inode = self._fd_inode_map[fd]
            del self._inode_fd_map[inode]
            del self._fd_inode_map[fd]
            shadow = self._fd_shadow_map.pop(fd)
            try:
                os.close(fd)
                shadow.close()
            except OSError as exc:
                raise FUSEError(exc.errno or errno.EIO)

    try:
        FUSEMOUNT.stat()
    except OSError as e:
        if e.errno == errno.ENOTCONN:
            # Clean up old mountpoint
            subprocess.check_call(['fusermount', '-u', FUSEMOUNT])
    FUSEMOUNT.mkdir(parents=True, exist_ok=True)
    fuse_options = set(pyfuse3.default_options)
    fuse_options.update(['fsname=openslide', 'ro'])
    pyfuse3.init(Operations(), FUSEMOUNT.as_posix(), fuse_options)


def _fusefs_run() -> None:
    '''Run an initialized FUSE filesystem.'''
    import pyfuse3
    import trio

    try:
        trio.run(pyfuse3.main)
    except BaseException:
        pyfuse3.close(unmount=False)
        raise
    pyfuse3.close()


@_command
def freeze(bucket: str = DEFAULT_FROZEN_BUCKET) -> None:
    '''Create a frozen testdata archive for transport to another system,
    upload it to an S3 bucket, and record its URL in the source tree.'''
    for test in TestCase.list():
        test.unpack()
    _fetch_for_mosaic()

    print('Running tests...')
    exclude_tests = []
    with TemporaryDirectory(prefix='shadow-', dir=FROZENBASE) as tempdir_path:
        tempdir = Path(tempdir_path)
        _fusefs_init(tempdir)
        Thread(name='fuse', target=_fusefs_run, daemon=False).start()
        # Run all tests that we might want to run on thawed testdata
        for test in TestCase.list():
            if test.conf.freezable:
                _, msg = test.run(workdir=FUSEMOUNT)
                print(msg)
                # Ensure we at least have a shadow directory for the test.
                # Otherwise, tests that solely test opening a missing file
                # in a multi-file format will cause a base slide fetch and
                # unpack when run against frozen data.
                (tempdir / test.name).mkdir(parents=True, exist_ok=True)
            else:
                exclude_tests.append(test)
        with NamedTemporaryFile(prefix='openslide-') as mosaic:
            _mosaic(Path(mosaic.name), FUSEMOUNT / '_pristine')
        subprocess.check_call(['fusermount', '-u', FUSEMOUNT])

        print('Freezing testdata...')
        # Delete spurious shadow directories from any incidental filesystem
        # access to skipped tests
        for test in exclude_tests:
            path = tempdir / test.name
            if path.exists():
                rmtree(path)
        # Python tarfile doesn't support creating sparse archives
        proc = subprocess.Popen(
            [
                'gtar',
                'c',
                '--sparse',
                # Make archive reproducible
                '--owner=0',
                '--group=0',
                '--mtime=@0',
                '--sort=name',
                '-C',
                tempdir,
                '.',
            ],
            stdout=subprocess.PIPE,
        )
        assert proc.stdout is not None
        compressor = LZMACompressor(preset=9)
        hasher = sha256()
        with TemporaryFile(prefix='frozen-', dir=FROZENBASE) as frozen:

            def update(buf: bytes) -> None:
                hasher.update(buf)
                frozen.write(buf)

            while True:
                buf = proc.stdout.read(1 << 20)
                if not buf:
                    break
                update(compressor.compress(buf))
            update(compressor.flush())
            proc.wait()
            if proc.returncode != 0:
                raise OSError('tar failed')
            digest = hasher.hexdigest()

            s3 = S3Uploader(bucket)
            if not s3.exists(PurePath(digest)):
                print('Uploading archive...')
                s3.upload(
                    PurePath(digest),
                    frozen,
                    content_type='application/x-xz',
                )

    manifest = {
        'url': s3.url(PurePath(digest)),
        'sha256': digest,
    }
    with FROZENLIST.open('w') as frozenlist:
        yaml.safe_dump(manifest, frozenlist, default_flow_style=False)


@_command
def unfreeze() -> None:
    '''Download and unpack the current frozen archive.'''
    with FROZENLIST.open() as fh:
        manifest = yaml.safe_load(fh)
        sha256 = manifest['sha256']
        link_target = 'frozen-' + sha256
    if os.path.lexists(FROZEN):
        current = Path(os.readlink(FROZEN))
        if current.as_posix() == link_target:
            return
        else:
            FROZEN.unlink()
            basedir = FROZENBASE / current
            if basedir.exists():
                rmtree(basedir)
    basedir = FROZENBASE / link_target
    basedir.mkdir(parents=True)
    with TemporaryFile(dir=FROZENBASE, prefix='frozen-') as fh:
        found_sha256 = _download(manifest['url'], 'frozen archive', fh)
        if found_sha256 != sha256:
            raise OSError(
                f'Hash mismatch: expected {sha256}, found {found_sha256}'
            )
        fh.seek(0)
        print('Unpacking...')
        tf = tarfile.open(fileobj=fh)
        while True:
            info = tf.next()
            if info is None:
                break
            # directory traversal shouldn't happen because we check the
            # tarball hash, but check anyway
            normalized = os.path.normpath(info.name)
            if normalized.startswith('/') or normalized.startswith('../'):
                raise OSError(f'Directory traversal: {info.name}')
            if info.isfile() or info.isdir():
                tf.extract(info, path=basedir, set_attrs=False)
            elif info.issym():
                (basedir / info.name).symlink_to(info.linkname)
            else:
                raise OSError(f'Unexpected type: {info.name}')
    FROZEN.symlink_to(link_target)


def _run_all(
    pattern: str = '*',
    valgrind: bool = False,
    xfail: Iterable[str] | None = None,
    progdir: Path | None = None,
    parallel: bool = True,
) -> int:
    '''Run all tests matching the specified pattern, under Valgrind if
    specified.  xfail specifies a list of tests which are expected to fail.
    Return the number of tests producing unexpected results.'''
    tests = TestCase.list(pattern)
    for test in tests:
        if not (FROZEN / test.name).exists():
            test.unpack()

    failed = 0
    executor = ThreadPoolExecutor(
        thread_name_prefix='test-runner-',
        max_workers=_cpu_count() if parallel else 1,
    )
    try:
        xfail = set(xfail or [])
        futures = []
        for test in tests:
            # Prefer the real test if available
            if (WORKROOT / test.name).exists():
                workdir = WORKROOT
            else:
                workdir = FROZEN
            futures.append(
                executor.submit(
                    test.run,
                    valgrind,
                    test.name in xfail,
                    progdir,
                    workdir=workdir,
                )
            )
        for future in futures:
            ok, msg = future.result()
            print(msg)
            if not ok:
                failed += 1
    except BaseException:
        # In particular, KeyboardInterrupt
        executor.shutdown(cancel_futures=True)
        raise
    else:
        executor.shutdown()
    print(f'\nFailed: {failed}/{len(tests)}')
    return failed


@_command
def run(pattern: str = '*') -> None:
    '''Unpack and run all tests matching the specified pattern.  Ignore
    failures of test cases listed in the comma-separated
    OPENSLIDE_TEST_XFAIL environment variable.'''
    xfail_env = os.environ.get('OPENSLIDE_TEST_XFAIL')
    xfail = xfail_env.split(',') if xfail_env else []
    if _run_all(pattern, xfail=xfail):
        sys.exit(1)


@contextmanager
def _rebuild(
    setup_args: list[str], env: dict[str, str] | None = None
) -> Iterator[Path]:
    '''Context manager: rebuild the source with the specified setup
    arguments and environment, and yield to the caller to do profiling.'''

    top_srcdir = SRCDIR.parent
    prevdir = Path.cwd()

    # Set up and build
    tempdir = mkdtemp(prefix='build-', dir=prevdir)
    os.chdir(top_srcdir)
    setup_env = os.environ.copy()
    if env:
        setup_env.update(env)
    subprocess.run(
        ['meson', 'setup', tempdir] + setup_args, check=True, env=setup_env
    )
    os.chdir(tempdir)
    subprocess.run(['ninja'], check=True)

    # Let the caller run, passing it the directory we came from.
    # Intentionally don't clean up tempdir on exception.
    yield prevdir

    # Remove temporary directory
    os.chdir(prevdir)
    rmtree(tempdir)


@_command
def coverage(outfile: str) -> None:
    '''Unpack and run all tests and write coverage report to outfile.'''
    with _rebuild(['-D_gcov=true']) as basedir:
        # Run tests
        # Avoid races when writing coverage data
        _run_all(progdir=Path('test'), parallel=False)

        # Generate coverage reports
        for dirpath, _, filenames in os.walk('src'):
            paths = [
                Path(dirpath) / name
                for name in fnmatch.filter(sorted(filenames), '*.gcda')
            ]
            if paths:
                args: list[str | Path] = ['gcov', '-o', dirpath]
                args.extend(paths)
                subprocess.check_call(args)

        # Record unexecuted lines; ignore glib headers
        args = ['grep', '-FC', '2', '#####']
        args.extend(
            f
            for f in fnmatch.filter(sorted(os.listdir('.')), '*.gcov')
            if not fnmatch.fnmatch(f, 'g*.h.gcov')
        )
        proc = subprocess.Popen(
            args,
            stdout=subprocess.PIPE,
        )
        report_bytes, _ = proc.communicate()
        if proc.returncode:
            raise OSError(f'Process returned exit status {proc.returncode}')
        report = '\n'.join(
            line.replace('.c.gcov', '.c', 1)
            for line in report_bytes.decode().split('\n')
        )
        with open(basedir / outfile, 'w') as fh:
            fh.write(report)


@_command
def valgrind(pattern: str = '*') -> None:
    '''Unpack and Valgrind all tests matching the specified pattern.
    Ignore failures of test cases listed in the OPENSLIDE_VALGRIND_XFAIL
    environment variable.'''
    xfail_env = os.environ.get('OPENSLIDE_VALGRIND_XFAIL')
    xfail = xfail_env.split(',') if xfail_env else []
    if _run_all(pattern, valgrind=True, xfail=xfail):
        sys.exit(1)


@_command
def sanitize(pattern: str = '*') -> None:
    '''Unpack and run all tests matching the specified pattern with
    clang sanitizers.  Ignore failures of test cases listed in the
    comma-separated OPENSLIDE_TEST_XFAIL environment variable.'''
    with _rebuild(['-D_sanitize=true'], env={'CC': 'clang'}):
        xfail_env = os.environ.get('OPENSLIDE_TEST_XFAIL')
        xfail = xfail_env.split(',') if xfail_env else []
        if _run_all(pattern, xfail=xfail, progdir=Path('test')):
            sys.exit(1)


def _fetch_for_mosaic() -> None:
    '''Fetch pristine slides needed for building the mosaic.'''
    cfg = RawConfigParser()
    cfg.read(MOSAICLIST)
    for section in cfg.sections():
        BaseSlide(PurePath(cfg.get(section, 'base'))).fetch()


def _mosaic(outfile: Path, pristinedir: Path = PRISTINE) -> None:
    '''Produce a mosaic image of slide data from various formats.'''
    subprocess.check_call(
        [
            BUILDDIR / 'mosaic',
            pristinedir,
            MOSAICLIST,
            outfile,
        ]
    )


@_command
def mosaic(outfile: str) -> None:
    '''Produce a mosaic image of slide data from various formats.'''
    if FROZEN.exists():
        pristinedir = FROZEN / '_pristine'
    else:
        pristinedir = PRISTINE
        _fetch_for_mosaic()
    _mosaic(Path(outfile), pristinedir)


def _successful_primary_tests(
    pattern: str = '*',
) -> Iterator[tuple[TestCase, UnpackedSlide]]:
    '''Yield test case and unpacked slide for each successful primary test.'''
    for test in TestCase.list(pattern):
        conf = test.conf
        if not conf.primary or not conf.success or not conf.features_available:
            continue
        test.unpack()
        if conf.filename == '':
            # synthetic test slide
            unpacked = UnpackedSlide(None)
        else:
            unpacked = UnpackedSlide(WORKROOT / test.name / conf.filename)
        yield test, unpacked


@_command
def time(pattern: str = '*') -> None:
    '''Time openslide_open() for all successful primary tests matching the
    specified pattern.'''
    for test, unpacked in _successful_primary_tests(pattern):
        proc = unpacked.run_prog(
            'try_open',
            args=['-t'],
            extra_checks=False,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            debug=['synthetic'],
        )
        out, err = proc.communicate()
        if proc.returncode or err:
            out = 'failed'
        print(f'{test.name:<40} {out.strip():<10}')


@_command
def exports() -> None:
    '''Report exported or hidden symbols with improper names.'''

    def get_symbols(basedir: Path) -> Iterator[str]:
        proc = subprocess.Popen(
            ['objdump', '-T', basedir / 'src' / 'libopenslide.so'],
            stdout=subprocess.PIPE,
        )
        out, _ = proc.communicate()
        if proc.returncode:
            raise OSError(f'objdump returned exit status {proc.returncode}')
        for line in out.decode().splitlines():
            words = line.split()
            if len(words) < 3:
                continue
            if words[1] != 'g':
                # Not a global symbol
                continue
            yield words[-1]

    # Magic ELF symbols
    ignore_symbols = {'__bss_start', '_edata', '_end', '_fini', '_init'}

    # Check exported symbols
    exported_symbols = set(get_symbols(BUILDDIR.parent)) - ignore_symbols
    bad_exported = {
        symbol
        for symbol in exported_symbols
        if not symbol.startswith('openslide_')
    }

    # Check hidden symbols
    with _rebuild(['-D_export_internal_symbols=true']):
        hidden_symbols = (
            set(get_symbols(Path.cwd())) - exported_symbols - ignore_symbols
        )
        bad_hidden = {
            symbol
            for symbol in hidden_symbols
            if not symbol.startswith('_openslide_')
        }

    # Report
    for symbol in sorted(bad_exported):
        print('Badly-named exported symbol:', symbol, file=sys.stderr)
    for symbol in sorted(bad_hidden):
        print('Badly-named hidden symbol:', symbol, file=sys.stderr)
    if bad_exported or bad_hidden:
        sys.exit(1)


@_command
def clean(pattern: str = '*') -> None:
    '''Delete temporary slide data for tests matching the specified
    pattern.  If pattern is `frozen` or omitted, also delete unfrozen
    test data.'''
    for test in TestCase.list(pattern):
        path = WORKROOT / test.name
        if path.exists():
            rmtree(path)
    if pattern == '*' or pattern == 'frozen':
        if os.path.lexists(FROZEN):
            basedir = FROZENBASE / os.readlink(FROZEN)
            FROZEN.unlink()
            if basedir.exists():
                rmtree(basedir)


@_command
def fuse() -> None:
    '''Mount the FUSE filesystem for debugging.'''
    with TemporaryDirectory(prefix='shadow-', dir=FROZENBASE) as tempdir:
        _fusefs_init(Path(tempdir))
        _fusefs_run()


def _get_arglist(f: Command) -> tuple[list[str], list[str]]:
    '''Return two lists of argument names for the specified function: the
    mandatory arguments and the optional ones.'''
    info = inspect.getfullargspec(f)
    if info.defaults:
        optcount = len(info.defaults)
        return info.args[:-optcount], info.args[-optcount:]
    else:
        return info.args, []


def _usage() -> None:
    '''Print usage message and exit.'''
    wrapper = textwrap.TextWrapper(
        width=76, initial_indent=' ' * 8, subsequent_indent=' ' * 8
    )
    print('Usage:')
    for name in _commands:
        f = _command_funcs[name]
        args, optargs = _get_arglist(f)
        argspecs = [f'<{a}>' for a in args] + [f'[{a}]' for a in optargs]
        print(f'    {name} {" ".join(argspecs)}')
        print(wrapper.fill(f.__doc__ or 'Undocumented.'))
        print()
    sys.exit(2)


def _main() -> None:
    if '@FEATURES@'.strip('@') == 'FEATURES':
        raise Exception('This program must be built with Meson before use.')
    try:
        cmd = sys.argv[1]
    except IndexError:
        _usage()
    try:
        f = _command_funcs[cmd]
    except KeyError:
        _usage()
    args, optargs = _get_arglist(f)
    argc = len(sys.argv) - 2
    if argc < len(args) or argc > len(args) + len(optargs):
        _usage()
    if not CACHE_TAG.exists():
        CACHE.mkdir(parents=True, exist_ok=True)
        with CACHE_TAG.open('w') as fh:
            fh.writelines(
                [
                    'Signature: 8a477f597d28d172789f06886806bc55\n',
                    "# This file is a cache directory tag created by OpenSlide's test suite.\n",  # noqa: E501
                    '# For information about cache directory tags, see https://bford.info/cachedir/\n',  # noqa: E501
                ]
            )
    f(*sys.argv[2:])


if __name__ == '__main__':
    _main()
